package etcd

import (
	"context"
	"errors"
	"fmt"
	"gitee.com/zackeus/go-boot/cron/drive"
	"gitee.com/zackeus/go-boot/tools/errorx"
	"gitee.com/zackeus/go-boot/tools/system"
	"gitee.com/zackeus/go-zero/core/logx"
	"gitee.com/zackeus/go-zero/core/threading"
	"gitee.com/zackeus/goutil/strutil"
	"github.com/google/uuid"
	"go.etcd.io/etcd/api/v3/mvccpb"
	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"google.golang.org/grpc/connectivity"
	"sync"
	"sync/atomic"
	"time"
)

const (
	defaultPfx     = "cron/"
	defaultLockKey = "lock"
	defaultTTl     = 15
	defaultTimeout = 3 * time.Second
)

type EtcdDriver struct {
	once        sync.Once
	mu          sync.RWMutex
	nodeID      string
	serviceName string

	cli     *clientv3.Client
	nodes   *sync.Map
	eventCh chan<- *drive.Event

	ttl                 int64
	leaseID             clientv3.LeaseID
	leaseCh             <-chan *clientv3.LeaseKeepAliveResponse
	reKeepAliveInterval time.Duration // 重新续约间隔
	leaseAlive          atomic.Bool   // 租约是否存活

	ctx    context.Context
	logger logx.Logger
	cancel context.CancelFunc

	prefix  string        // 前缀
	timeout time.Duration // 操作超时
}

// NewDriver NewEtcdDriver
func NewDriver(serverName string, cli *clientv3.Client, opts ...Options) *EtcdDriver {
	e := &EtcdDriver{
		serviceName:         serverName,
		cli:                 cli,
		nodes:               &sync.Map{},
		ttl:                 defaultTTl,
		timeout:             defaultTimeout,
		reKeepAliveInterval: 3 * time.Second,
		prefix:              defaultPfx,
	}
	for _, opt := range opts {
		opt(e)
	}

	/* 优先使用容器ID */
	e.nodeID = system.PodID()
	if strutil.IsBlank(e.nodeID) {
		e.nodeID = uuid.New().String()
	}

	e.leaseAlive.Store(false)
	return e
}

func (e *EtcdDriver) key(key string) string {
	return fmt.Sprintf("%s%v", e.prefix, key)
}

func (e *EtcdDriver) Available() bool {
	if e.ctx == nil || e.ctx.Err() != nil {
		logx.Error("the cron driver has closed.")
		return false
	}
	if !e.leaseAlive.Load() {
		/* 租约无效 */
		logx.Alert(fmt.Sprintf("the cron node [%v] lease not alive.", e.nodeID))
		return false
	}
	return true
}

func (e *EtcdDriver) NodeID() string {
	return e.nodeID
}

func (e *EtcdDriver) GetNodes() (nodes []string, err error) {
	return e.getNodes(), nil
}

func (e *EtcdDriver) Start(ctx context.Context, eventCh chan<- *drive.Event) error {
	var gErr error
	e.once.Do(func() {
		e.eventCh = eventCh
		e.ctx, e.cancel = context.WithCancel(ctx)
		e.logger = logx.WithContext(e.ctx)

		/* 构建 session */
		session, err := concurrency.NewSession(e.cli, concurrency.WithTTL(int(e.ttl)))
		if err != nil {
			gErr = err
			return
		}
		defer func(session *concurrency.Session) {
			_ = session.Close()
		}(session)
		mu := concurrency.NewMutex(session, e.key(defaultLockKey))
		/* 加锁 */
		if gErr = mu.Lock(e.ctx); gErr != nil {
			return
		}
		/* 解锁 */
		defer func(mu *concurrency.Mutex, ctx context.Context) {
			_ = mu.Unlock(ctx)
		}(mu, e.ctx)

		/* 初始化 */
		if gErr = e.init(); gErr != nil {
			return
		}

		e.watcher()
		/* 注册租约 */
		if gErr = e.register(); gErr != nil {
			_ = e.Stop()
			return
		}

		/* 租约续期 */
		if gErr = e.keepAlive(); gErr != nil {
			_ = e.Stop()
			return
		}
	})
	return gErr
}

func (e *EtcdDriver) Stop() (err error) {
	e.revoke()
	e.cancel()
	return
}

// setServiceList 新增服务地址
func (e *EtcdDriver) setNode(key, val string) {
	e.nodes.Store(key, val)
}

// DelServiceList 删除服务地址
func (e *EtcdDriver) delNode(key string) {
	e.nodes.Delete(key)
}

// GetServices 获取服务地址
func (e *EtcdDriver) getNodes() []string {
	adds := make([]string, 0)
	e.nodes.Range(func(key, val any) bool {
		adds = append(adds, val.(string))
		return true
	})
	return adds
}

func (e *EtcdDriver) init() error {
	ctx, cancel := context.WithTimeout(e.ctx, e.timeout)
	defer cancel()

	/* 根据前缀获取现有的key */
	resp, err := e.cli.Get(ctx, e.key(e.serviceName), clientv3.WithPrefix())
	if err != nil {
		return err
	}
	for _, ev := range resp.Kvs {
		e.setNode(string(ev.Key), string(ev.Value))
	}
	return nil
}

// 注册租约
func (e *EtcdDriver) register() error {
	ctx, cancel := context.WithTimeout(e.ctx, e.timeout)
	defer cancel()

	lease := clientv3.NewLease(e.cli)
	/* 声明租约 */
	leaseGrant, err := lease.Grant(ctx, e.ttl)
	if err != nil {
		return err
	}

	nodeKey := e.key(fmt.Sprintf("%s/%v", e.serviceName, e.nodeID))
	/* 使用事务 先检查 workId 是否被占用 再绑定租约 */
	txResp, err := e.cli.Txn(ctx).
		If(clientv3.Compare(clientv3.LeaseValue(nodeKey), "=", 0)).
		Then(clientv3.OpPut(nodeKey, e.nodeID, clientv3.WithLease(leaseGrant.ID))).
		Commit()
	if err != nil {
		return err
	}
	/* 事务操作失败 当前 workId 被其他服务占用  */
	if !txResp.Succeeded {
		return errors.New(fmt.Sprintf("the workId [%v] reuse.", e.nodeID))
	}

	e.leaseID = leaseGrant.ID
	return nil
}

// watcher 监听服务列表
func (e *EtcdDriver) watcher() {
	threading.GoSafeCtx(e.ctx, func() {
		ch := e.cli.Watch(clientv3.WithRequireLeader(e.ctx), e.key(e.serviceName), clientv3.WithPrefix())
		for {
			select {
			case resp := <-ch:
				if resp.Err() != nil {
					// TODO 异常处理
					e.logger.Error("************** ", resp.Err())
					return
				}

				for _, ev := range resp.Events {
					switch ev.Type {
					case mvccpb.DELETE:
						{
							e.mu.Lock()
							if val, ok := e.nodes.Load(string(ev.Kv.Key)); ok {
								e.eventCh <- &drive.Event{Node: val.(string), Type: drive.DelNodeEvent}
							}
							e.delNode(string(ev.Kv.Key))
							e.mu.Unlock()
						}
					case mvccpb.PUT:
						{
							if ev.Kv == nil {
								continue
							}

							nodeKey, nodeVal := string(ev.Kv.Key), string(ev.Kv.Value)
							e.mu.Lock()
							if val, ok := e.nodes.Load(nodeKey); ok {
								if val.(string) == nodeVal {
									/* val值相同忽略 */
									e.mu.Unlock()
									continue
								} else {
									/* 值不同 发送删除节点 */
									e.eventCh <- &drive.Event{Node: val.(string), Type: drive.DelNodeEvent}
								}
							}

							e.setNode(nodeKey, nodeVal)
							e.eventCh <- &drive.Event{Node: nodeVal, Type: drive.AddNodeEvent}
							e.mu.Unlock()
						}
					default:
						continue
					}
				}
			case <-e.ctx.Done():
				return
			}
		}
	})
}

func (e *EtcdDriver) keepAlive() error {
	/* 自动续期 */
	ch, err := e.cli.KeepAlive(e.ctx, e.leaseID)
	if err != nil {
		return err
	}
	e.leaseAlive.Store(true)

	threading.GoSafeCtx(e.ctx, func() {
		for {
			select {
			case _, ok := <-ch:
				if !ok {
					/* 租约过期 可能网络连接断开 进行续期 */
					e.leaseAlive.Store(false)
					e.revoke()
					if err := e.reKeepAlive(); err != nil {
						e.logger.Errorf("cron driver KeepAlive: %s", err.Error())
					}
					return
				}
			case <-e.ctx.Done():
				return
			}
		}
	})
	return nil
}

// 自动续期
func (e *EtcdDriver) reKeepAlive() error {
	if e.ctx == nil || e.ctx.Err() != nil {
		return nil
	}

	ticker := time.NewTicker(e.reKeepAliveInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			/* 连接已关闭 */
			conn := e.cli.ActiveConnection()
			if conn == nil || conn.GetState() == connectivity.Shutdown {
				return nil
			}

			if err := e.register(); err != nil {
				e.logger.Error(errorx.Wrap(err, "cron driver register failed, will try again."))
				break
			}
			if err := e.keepAlive(); err != nil {
				e.logger.Error(errorx.Wrap(err, "cron driver keepAlive failed, will try again."))
				e.revoke()
				break
			}

			e.logger.Debugf("cron the nodeId [%v] reKeepAlive success", e.nodeID)
			return nil
		case <-e.ctx.Done():
			return nil
		}
	}
}

func (e *EtcdDriver) revoke() {
	ctx, cancel := context.WithTimeout(e.ctx, e.timeout)
	defer cancel()

	_, err := e.cli.Lease.Revoke(ctx, e.leaseID)
	if err != nil && !errors.Is(err, rpctypes.ErrLeaseNotFound) {
		e.logger.Errorf("lease revoke error: %v", err)
	}
}
